fix: read exactly M values from channel#5447
Conversation
|
I fixed this problem accidentally as well with #5449 but that solves more, because on DATA strategy early return, the subsequent RACE strategy will not deal with stale counters. the Edit: though it makes sense to give a dedicated error return at the end of the block, even if the assumption should be correct now. that return never should be reached anyway and a named error could indicate it. |
|
@nugaon I don't fully understand why the buffered channel is needed in the first place. This is an anti-pattern in go and is against our coding guidelines: channel size should be one or none. If we would respect context cancellation and not have these sort of returns that forever block on a channel (aka: have a |
|
on fan-in cases like this where we have exact number of go processes, creating the channel with the size of the routines could be considerable because it does not block at all. we could make it with waitgroups and wait for all processes to return. |
|
i am not sure i agree @nugaon. in your pr, there is still a goroutine leak possible, and this is just another evidence as to why buffered channels are problematic: for _, i := range m {
go func(i int) {
c <- g.fetch(ctx, i, false) // <- since this is in a goroutine, even if the context expired and g.fetch returned, we still try to write to the channel. if there is no reader actively draining the channel to the end, the goroutine still tries to write to the channel forever
}(i)
}
for completed < len(m) {
<-c
completed++
if g.fetchedCnt.Load() >= int32(g.shardCnt) {
return nil // <- this can return early, preventing the rest of the channel from being fully drained, and the same goes below. if the channel is not fully drained, the writing goroutines hang forever.
}
if g.failedCnt.Load() > int32(allowedErrs) {
return errStrategyFailed
}
} |
janos
left a comment
There was a problem hiding this comment.
This PR successfully fixes the deadlock that it describes with the minimal changes to the current implementation.
Since the channel is buffered to exactly the number of goroutines, no goroutine leak would happen as every write to the channel would not block.
There is no need to drain the channel. The channel c and the results sitting in its buffer will be cleaned up by the garbage collector, when all goroutines finish.
But, the whole implementation here is questionable since the allocation of the whole buffered channel can be avoided. The approach that was taken here makes the getter allocate the channel, but also to spawn the m number of goroutines. For maximal speed for finding the replica, a large number of goroutines is prefered, but that does not scale linearly. I would assume that there can be a concurrency level that would be optimal for both retrieval speed and resources utilization.
Just as an example, a slightly different code is avoiding buffered channel and avoids the deadlock:
c := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for _, i := range m {
go func(i int) {
err := g.fetch(ctx, i, false)
select {
case c <- err:
case <-ctx.Done():
}
}(i)
}
for range len(m) {
select {
case <-c:
if g.fetchedCnt.Load() >= int32(g.shardCnt) {
return nil
}
if g.failedCnt.Load() > int32(allowedErrs) {
return errStrategyFailed
}
case <-ctx.Done():
return nil
}
}And for my preference, I would protect the goroutines with a semaphore.
But as I said, this change is enough for fixing the problem.
Checklist
Description
Read exactly M values from channel instead of reading "forever", which leads to deadlock in really seldom cases, which happen in mostly in CI
Open API Spec Version Changes (if applicable)
Motivation and Context (Optional)
runStrategyassumes that while consuming results fromc, one of the two exit conditions will always become true before reads are exhausted. Because of that assumption, the previous loop usedfor range c(which only terminates when c is closed).Why this can deadlock
In the DATA -> RACE fallback path, some DATA-owned shard fetches may still be finishing while RACE starts.
RACE can include data shard indices whose waits[i] is not closed yet; for those indices, fetch takes the fly=false path and waits on waits[i].
For a successful DATA fetch, the operation order is:
There is a small window between (1) and (2):
c,runStrategycan consume that last value fromc,fetchedCntmay still be stale for that check.If this happens on the last available message in
c, neither exit condition may trigger in that iteration, and the next read blocks forever (no more writers, channel not closed).Related Issue (Optional)
Screenshots (if appropriate):
AI Disclosure